#!/usr/bin/env python
#
# Copyright (C) 2011, 2012 Igalia S.L.
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Library General Public
# License as published by the Free Software Foundation; either
# version 2 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Library General Public License for more details.
#
# You should have received a copy of the GNU Library General Public License
# along with this library; see the file COPYING.LIB.  If not, write to
# the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
# Boston, MA 02110-1301, USA.

import subprocess
import os
import sys
import optparse
import re
from signal import alarm, signal, SIGALRM, SIGKILL
from gi.repository import Gio, GLib

top_level_directory = os.path.normpath(os.path.join(os.path.dirname(__file__), "..", ".."))
sys.path.append(os.path.join(top_level_directory, "Tools", "jhbuild"))
sys.path.append(os.path.join(top_level_directory, "Tools", "gtk"))
import common
import jhbuildutils

class SkippedTest:
    ENTIRE_SUITE = None

    def __init__(self, test, test_case, reason, bug=None):
        self.test = test
        self.test_case = test_case
        self.reason = reason
        self.bug = bug

    def __str__(self):
        skipped_test_str = "%s" % self.test

        if not(self.skip_entire_suite()):
            skipped_test_str += " [%s]" % self.test_case

        skipped_test_str += ": %s " % self.reason
        if self.bug is not None:
            skipped_test_str += "(https://bugs.webkit.org/show_bug.cgi?id=%d)" % self.bug
        return skipped_test_str

    def skip_entire_suite(self):
        return self.test_case == SkippedTest.ENTIRE_SUITE

class TestTimeout(Exception):
    pass

class TestRunner:
    TEST_DIRS = [ "unittests", "WebKit2APITests", "TestWebKitAPI" ]

    SKIPPED = [
        SkippedTest("unittests/testdownload", "/webkit/download/not-found", "Test fails in GTK Linux 64-bit Release bot", 82329),
        SkippedTest("unittests/testwebinspector", "/webkit/webinspector/close-and-inspect", "Test is flaky in GTK Linux 32-bit Release bot", 82869),
        SkippedTest("unittests/testwebresource", "/webkit/webresource/loading", "Test fails", 104689),
        SkippedTest("unittests/testwebresource", "/webkit/webresource/sub_resource_loading", "Test fails in GTK Linux 64-bit Release bot", 82330),
        SkippedTest("unittests/testwebview", "/webkit/webview/icon-uri", "Test times out in GTK Linux 64-bit Release bot", 82328),
        SkippedTest("unittests/testatk", "/webkit/atk/getTextInParagraphAndBodyModerate", "Test fails", 105538),
        SkippedTest("WebKit2APITests/TestInspectorServer", SkippedTest.ENTIRE_SUITE, "Test times out", 105866),
        SkippedTest("WebKit2APITests/TestResources", "/webkit2/WebKitWebView/resources", "Test is flaky in GTK Linux 32-bit Release bot", 82868),
        SkippedTest("WebKit2APITests/TestWebKitAccessibility", "/webkit2/WebKitAccessibility/atspi-basic-hierarchy", "Test fails", 100408),
        SkippedTest("WebKit2APITests/TestWebKitWebView", SkippedTest.ENTIRE_SUITE, "Test times out after r150890", 117689),
        SkippedTest("WebKit2APITests/TestContextMenu", SkippedTest.ENTIRE_SUITE, "Test times out after r150890", 117689),
        SkippedTest("WebKit2APITests/TestWebKitFaviconDatabase", SkippedTest.ENTIRE_SUITE, "Test times out", 117690),
        SkippedTest("TestWebKitAPI/TestWebKit2", "WebKit2.CanHandleRequest", "Test fails", 88453),
        SkippedTest("TestWebKitAPI/TestWebKit2", "WebKit2.MouseMoveAfterCrash", "Test is flaky", 85066),
        SkippedTest("TestWebKitAPI/TestWebKit2", "WebKit2.NewFirstVisuallyNonEmptyLayoutForImages", "Test is flaky", 85066),
        SkippedTest("TestWebKitAPI/TestWebKit2", "WebKit2.NewFirstVisuallyNonEmptyLayoutFrames", "Test fails", 85037),
        SkippedTest("TestWebKitAPI/TestWebKit2", "WebKit2.RestoreSessionStateContainingFormData", "Session State is not implemented in GTK+ port", 84960),
        SkippedTest("TestWebKitAPI/TestWebKit2", "WebKit2.SpacebarScrolling", "Test fails", 84961),
        SkippedTest("TestWebKitAPI/TestWebKit2", "WebKit2.WKConnection", "Tests fail and time out out", 84959),
        SkippedTest("TestWebKitAPI/TestWebKit2", "WebKit2.WKPageGetScaleFactorNotZero", "Test fails and times out", 88455),
        SkippedTest("TestWebKitAPI/TestWebKit2", "WebKit2.ForceRepaint", "Test times out", 105532),
        SkippedTest("TestWebKitAPI/TestWebKit2", "WebKit2.ReloadPageAfterCrash", "Test flakily times out", 110129),
    ]

    def __init__(self, options, tests=[]):
        self._options = options
        self._build_type = "Debug" if self._options.debug else "Release"

        self._programs_path = common.build_path_for_build_types((self._build_type,), "Programs")
        self._tests = self._get_tests(tests)
        self._skipped_tests = TestRunner.SKIPPED
        if not sys.stdout.isatty():
            self._tty_colors_pattern = re.compile("\033\[[0-9;]*m")

        # These SPI daemons need to be active for the accessibility tests to work.
        self._spi_registryd = None
        self._spi_bus_launcher = None

    def _get_tests(self, tests):
        if tests:
            return tests

        tests = []
        for test_dir in self.TEST_DIRS:
            absolute_test_dir = os.path.join(self._programs_path, test_dir)
            if not os.path.isdir(absolute_test_dir):
                continue
            for test_file in os.listdir(absolute_test_dir):
                if not test_file.lower().startswith("test"):
                    continue
                test_path = os.path.join(self._programs_path, test_dir, test_file)
                if os.path.isfile(test_path) and os.access(test_path, os.X_OK):
                    tests.append(test_path)
        return tests

    def _lookup_atspi2_binary(self, filename):
        exec_prefix = common.pkg_config_file_variable('atspi-2', 'exec_prefix')
        if not exec_prefix:
            return None
        for path in ['libexec', 'lib/at-spi2-core', 'lib32/at-spi2-core', 'lib64/at-spi2-core']:
            filepath = os.path.join(exec_prefix, path, filename)
            if os.path.isfile(filepath):
                return filepath

        return None

    def _start_accessibility_daemons(self):
        spi_bus_launcher_path = self._lookup_atspi2_binary('at-spi-bus-launcher')
        spi_registryd_path = self._lookup_atspi2_binary('at-spi2-registryd')
        if not spi_bus_launcher_path or not spi_registryd_path:
            return False

        try:
            self._ally_bus_launcher = subprocess.Popen([spi_bus_launcher_path], env=self._test_env)
        except:
            sys.stderr.write("Failed to launch the accessibility bus\n")
            sys.stderr.flush()
            return False

        # We need to wait until the SPI bus is launched before trying to start the SPI
        # registry, so we spin a main loop until the bus name appears on DBus.
        loop = GLib.MainLoop()
        Gio.bus_watch_name(Gio.BusType.SESSION, 'org.a11y.Bus', Gio.BusNameWatcherFlags.NONE,
                           lambda *args: loop.quit(), None)
        loop.run()

        try:
            self._spi_registryd = subprocess.Popen([spi_registryd_path], env=self._test_env)
        except:
            sys.stderr.write("Failed to launch the accessibility registry\n")
            sys.stderr.flush()
            return False

        return True

    def _setup_testing_environment(self):
        self._test_env = os.environ
        self._test_env["DISPLAY"] = self._options.display
        self._test_env["WEBKIT_INSPECTOR_PATH"] = os.path.abspath(os.path.join(self._programs_path, 'resources', 'inspector'))
        self._test_env['GSETTINGS_BACKEND'] = 'memory'
        self._test_env["TEST_WEBKIT_API_WEBKIT2_RESOURCES_PATH"] = common.top_level_path("Tools", "TestWebKitAPI", "Tests", "WebKit2")
        self._test_env["TEST_WEBKIT_API_WEBKIT2_INJECTED_BUNDLE_PATH"] = common.build_path_for_build_types((self._build_type,), "Libraries")
        self._test_env["WEBKIT_EXEC_PATH"] = self._programs_path

        try:
            self._xvfb = subprocess.Popen(["Xvfb", self._options.display, "-screen", "0", "800x600x24", "-nolisten", "tcp"],
                                          stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        except Exception as e:
            sys.stderr.write("Failed to run Xvfb: %s\n" % e)
            sys.stderr.flush()
            return False

        # If we cannot start the accessibility daemons, we can just skip the accessibility tests.
        if not self._start_accessibility_daemons():
            print "Could not start accessibility bus, so skipping TestWebKitAccessibility"
            self._skipped_tests.append(SkippedTest("WebKit2APITests/TestWebKitAccessibility", SkippedTest.ENTIRE_SUITE, "Could not start accessibility bus"))
        return True

    def _tear_down_testing_environment(self):
        if self._spi_registryd:
            self._spi_registryd.terminate()
        if self._spi_bus_launcher:
            self._spi_bus_launcher.terminate()
        self._xvfb.terminate()

    def _test_cases_to_skip(self, test_program):
        if self._options.skipped_action != 'skip':
            return []

        test_cases = []
        for skipped in self._skipped_tests:
            if test_program.endswith(skipped.test) and not skipped.skip_entire_suite():
                test_cases.append(skipped.test_case)
        return test_cases

    def _should_run_test_program(self, test_program):
        # This is not affected by the command-line arguments, since programs are skipped for
        # problems in the harness, such as failing to start the accessibility bus.
        for skipped in self._skipped_tests:
            if test_program.endswith(skipped.test) and skipped.skip_entire_suite():
                return False
        return True

    def _get_child_pid_from_test_output(self, output):
        if not output:
            return -1
        match = re.search(r'\(pid=(?P<child_pid>[0-9]+)\)', output)
        if not match:
            return -1
        return int(match.group('child_pid'))

    def _kill_process(self, pid):
        try:
            os.kill(pid, SIGKILL)
        except OSError:
            # Process already died.
            pass

    def _run_test_command(self, command, timeout=-1):
        def alarm_handler(signum, frame):
            raise TestTimeout

        child_pid = [-1]
        def parse_line(line, child_pid = child_pid):
            if child_pid[0] == -1:
                child_pid[0] = self._get_child_pid_from_test_output(line)

            if sys.stdout.isatty():
                sys.stdout.write(line)
            else:
                sys.stdout.write(self._tty_colors_pattern.sub('', line.replace('\r', '')))

        def waitpid(pid):
            while True:
                try:
                    return os.waitpid(pid, 0)
                except (OSError, IOError) as e:
                    if e.errno == errno.EINTR:
                        continue
                    raise

        def return_code_from_exit_status(status):
            if os.WIFSIGNALED(status):
                return -os.WTERMSIG(status)
            elif os.WIFEXITED(status):
                return os.WEXITSTATUS(status)
            else:
                # Should never happen
                raise RuntimeError("Unknown child exit status!")

        pid, fd = os.forkpty()
        if pid == 0:
            os.execvpe(command[0], command, self._test_env)
            sys.exit(0)

        if timeout > 0:
            signal(SIGALRM, alarm_handler)
            alarm(timeout)

        try:
            common.parse_output_lines(fd, parse_line)
            if timeout > 0:
                alarm(0)
        except TestTimeout:
            self._kill_process(pid)
            if child_pid[0] > 0:
                self._kill_process(child_pid[0])
            raise

        try:
            dummy, status = waitpid(pid)
        except OSError as e:
            if e.errno != errno.ECHILD:
                raise
            # This happens if SIGCLD is set to be ignored or waiting
            # for child processes has otherwise been disabled for our
            # process.  This child is dead, we can't get the status.
            status = 0

        return not return_code_from_exit_status(status)

    def _run_test_glib(self, test_program):
        tester_command = ['gtester']
        if self._options.verbose:
            tester_command.append('--verbose')
        for test_case in self._test_cases_to_skip(test_program):
            tester_command.extend(['-s', test_case])
        tester_command.append(test_program)

        return self._run_test_command(tester_command, self._options.timeout)

    def _run_test_google(self, test_program):
        tester_command = [test_program]
        skipped_tests_cases = self._test_cases_to_skip(test_program)
        if skipped_tests_cases:
            tester_command.append("--gtest_filter=-%s" % ":".join(skipped_tests_cases))

        return self._run_test_command(tester_command, self._options.timeout)

    def _run_test(self, test_program):
        if "unittests" in test_program or "WebKit2APITests" in test_program:
            return self._run_test_glib(test_program)

        if "TestWebKitAPI" in test_program:
            return self._run_test_google(test_program)

        return False

    def run_tests(self):
        if not self._tests:
            sys.stderr.write("ERROR: tests not found in %s.\n" % (self._programs_path))
            sys.stderr.flush()
            return 1

        if not self._setup_testing_environment():
            return 1

        # Remove skipped tests now instead of when we find them, because
        # some tests might be skipped while setting up the test environment.
        self._tests = [test for test in self._tests if self._should_run_test_program(test)]

        failed_tests = []
        timed_out_tests = []
        try:
            for test in self._tests:
                success = True
                try:
                    success = self._run_test(test)
                except TestTimeout:
                    sys.stdout.write("TEST: %s: TIMEOUT\n" % test)
                    sys.stdout.flush()
                    timed_out_tests.append(test)

                if not success:
                    failed_tests.append(test)
        finally:
            self._tear_down_testing_environment()

        if failed_tests:
            names = [test.replace(self._programs_path, '', 1) for test in failed_tests]
            sys.stdout.write("Tests failed (%d): %s\n" % (len(names), ", ".join(names)))
            sys.stdout.flush()

        if timed_out_tests:
            names = [test.replace(self._programs_path, '', 1) for test in timed_out_tests]
            sys.stdout.write("Tests that timed out (%d): %s\n" % (len(names), ", ".join(names)))
            sys.stdout.flush()

        if self._skipped_tests and self._options.skipped_action == 'skip':
            sys.stdout.write("Tests skipped (%d):\n%s\n" %
                             (len(self._skipped_tests),
                              "\n".join([str(skipped) for skipped in self._skipped_tests])))
            sys.stdout.flush()

        return len(failed_tests) + len(timed_out_tests)

if __name__ == "__main__":
    if not jhbuildutils.enter_jhbuild_environment_if_available("gtk"):
        print "***"
        print "*** Warning: jhbuild environment not present. Run update-webkitgtk-libs before build-webkit to ensure proper testing."
        print "***"

    option_parser = optparse.OptionParser(usage='usage: %prog [options] [test...]')
    option_parser.add_option('-r', '--release',
                             action='store_true', dest='release',
                             help='Run in Release')
    option_parser.add_option('-d', '--debug',
                             action='store_true', dest='debug',
                             help='Run in Debug')
    option_parser.add_option('-v', '--verbose',
                             action='store_true', dest='verbose',
                             help='Run gtester in verbose mode')
    option_parser.add_option('--display', action='store', dest='display', default=':55',
                             help='Display to run Xvfb')
    option_parser.add_option('--skipped', action='store', dest='skipped_action',
                             choices=['skip', 'ignore', 'only'], default='skip',
                             metavar='skip|ignore|only',
                             help='Specifies how to treat the skipped tests')
    option_parser.add_option('-t', '--timeout',
                             action='store', type='int', dest='timeout', default=10,
                             help='Time in seconds until a test times out')
    options, args = option_parser.parse_args()

    sys.exit(TestRunner(options, args).run_tests())
